package com.huan.filter.transformation;

import com.huan.filter.vo.Person;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * flatMap 算子： 数据转换, 1个输出，0个或多个输出
 * 需求： 如果用户的id >=2 输出 name和 age，否则只输出 name
 *
 * @author huan.fu
 * @date 2023/9/18 - 22:43
 */
public class FlatMapApplication {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();

        environment.fromElements(
                        new Person(1, "张三", 20),
                        new Person(2, "李四", 25),
                        new Person(3, "王五", 30)
                )
                // flatMap 转换, 1个输入，多个输出
                .flatMap(new FlatMapFunction<Person, String>() {
                    @Override
                    public void flatMap(Person person, Collector<String> collector) throws Exception {
                        if (person.getId() >= 2) {
                            collector.collect(person.getName());
                            collector.collect(person.getAge().toString());
                        } else {
                            collector.collect(person.getName());
                        }
                    }
                })
                .print();

        environment.execute("map operation");
    }
}